package com.example.zookeeper.config.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Component;

/**
 * kafka 容器监听
 */
@Component
public class KafkaContainerListener {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 开启监听.
     *
     * @param listenerId 监听ID
     */
    public void startListener(String listenerId) {
        //判断监听容器是否启动，未启动则将其启动
        if (!registry.getListenerContainer(listenerId).isRunning()) {
            registry.getListenerContainer(listenerId).start();
        }
        //项目启动的时候监听容器是未启动状态，而resume是恢复的意思不是启动的意思
        registry.getListenerContainer(listenerId).resume();
    }

    /**
     * 停止监听.
     *
     * @param listenerId 监听ID
     */
    public void stopListener(String listenerId) {
        if (registry.getListenerContainer(listenerId).isRunning()) {
            registry.getListenerContainer(listenerId).stop();
        }
    }

    /**
     * 是否已经停止监听
     *
     * @param listenerId
     * @return
     */
    public boolean isContainerRunning(String listenerId) {
        return registry.getListenerContainer(listenerId).isRunning();
    }

}
